Distributed Python code

In this second tutorial we are looking into distributing Python code on High Performance Computing (HPC) clusters. These clusters are assemblages of computer nodes, with multiple CPU and cores within that are coordinated by a main scheduler which distributes tasks and organizes resources. The cluster we are going to connect to is located at Simon Fraser University and named Cedar, after BC's official tree Western Red Cedar. The resources of this cluster, like many others, are handled by the Simple Linux Utility for Resource Management (SLURM) job scheduler. Therefore, we need to comply to its specifications when submitting jobs to this cluster.

For more information on how to connect to Cedar, check the Alliance Website here.

In this tutorial, we are going to:

  1. Run parallel Python code on a single node
  2. Run distributed Python code on multiple nodes

Both these applications are related to CPU operations, while we will only briefly touch on distributed GPU tasks at the end of the tutorial. Let's get started!

Single node vs Multi-node

Single node calls are the easiest, because they don't require internode communication. Therefore, many Python libraries and modules for task parallelization that are designed for local machine usage, can also be used on single nodes (e.g. multiprocessing, joblib). Therefore, they may not be suited to dispatch tasks on multiple cluster nodes.

For multi-node CPU tasks, dask is a great Python package for scalable workloads, whether the requested resources fit within a single node or require multiple nodes. It is designed to work with big dataframes and arrays in a lazy fashion, and can be easily implemented to parallelize your Python code. Note, however, that relying on too much parallelized code doesn't necessarily equate to greater performance and faster speed. Parallelization comes with additional overhead, which is worth taking when the tasks can actually benefit from it.

Setting dask to communicate with the cluster is as easy as calling a single function! To do that, we need to also install dask_jobqueue, which is the recommended package to communicate with HPC clusters (many are supported). The dask API is vast and offers many options to parallelize your code and submit tasks to a cluster. Take your time to familiarize with it.

In this example, we are working with a sizeable array of 2.5 billion cells that we don't want to or can not keep in memory, and we'll do some basic operations on it for demonstration. dask has a nice numpy-like interface, so it is pretty faimilar to work with. Importantly, because the computations are done lazily, similarly to polars, we only obtain a result when we actively ask for it. That is, we dump all our expensive computations on the cluster and we collect a single result at the end on our login node.

# ssh into your login node
# ...

# from within the login node, prepare the script
import dask.array as da
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

# initializa an array of random floats and chunk it for blockwise-operations
array = da.random.random((50_000, 50_000), chunks=(2000, 2000))

# do some computation
def add(x):
    return x + 1

# open connection with cluster in context manager to automatically close
# the connection upon completion
# the SLURM configuration is saved on your local disk, so if you modify
# it there, you don't need to request the resouces again in this call
with SLURMCluster(
    account="def-accountname",
    cores=10,
    memory="10 GB"
) as cluster, Client(cluster) as client:
    # request 1 worker
    cluster.scale(1)
    # map the function to each chunk of the array
    array1 = array.map_blocks(add)
    array1.compute()
    # apply another function to `array` using the dask.array API
    array2 = array.sum()
    result = array2.compute()

In other cases, when calling functions like map or submit, dask returns a future, that is a promise that a computation will be carried out. In this case, we have to wait until our job passes the queue (pending state) to be able to see the future status as finished.

One other thing you may have noticed, is that we can't really monitor how the computation is going, how much memory or CPU it's using, or the amount of RAM being consumed. To solve this issue, dask offers a great dashboard to monitor the execution on the cluster. You need to establish a ssh tunnel to the cluster from your local workstation, so once we call SLURMcluster (or any other jobqueue interface), we can start monitoring the workload distribution and progress. Ultimately, it would look something like the following.

Workers being deployed

Cluster status

In case we required more computation power, we can further scale our cluster by requesting more workers by using the scale method in cluster. This would also increase our wait time.

Parallelize (almost) any Python code

What if the function I need to run is not available via the dask API or if my function comes from another package? How can I take advantage of HPC clusters to speed up my work? One way to achieve this is via the dask.delayed API. A wrapper around your functions that delays the computation until is absolutely necessary. Delayed functions can be easily parallelized using the dask API. Let's take as example the previous clustering approach with HDBSCAN.

import dask
from hdbscan import HDBSCAN

# define delayed function
@dask.delayed
def foo_delayed(data):
    clusterer = HDBSCAN().fit(data)
    return clusterer.labels_

def foo(data):
    clusterer = HDBSCAN().fit(data)
    return clusterer.labels_

The delayed version with 8 cores on a local machine runs at the same speed as the non-delayed version. In this case the function can't really benefit from parallelization. This limitation is a result of dask not having control over the function's internals. All it does is to parallelize the function call. So in this case we can't really expect any benefits. To actually gain speed you would need to batch your data, if applicable, or run it on the GPU if you have enough VRAM.

import numpy as np
from dask.distributed import Client

# open local client (could be HPC cluster)
client = Client(n_workers=8)

# prepare data
data = np.random.rand(10_000, 1000)

# run
labels_delayed = foo_delayed(data).compute()  # ~64 sec
labels = foo(data) # ~64 sec

# close client
client.close()

Alternatively, dask offers an extensive API for machine learning tasks. For example, dask provides a direct API for KMeans that mimics that of sklearn. This is available through the dask-ml package and can be easily deployed to HPC clusters using the same API we saw earlier for SLURM clusters.

from dask_ml.cluster import KMeans

# prepare delayed function
def get_kmeans_labels(n_clusters, data):
    # initialize kmeans with 3 clusters
    kmeans = KMeans(n_clusters=n_clusters).fit(data)
    # get cluster labels (iris here has only 4 features)
    return kmeans.labels_

# run on SLURM

GPU nodes

Dask offers an interface to deal with GPU applications on clusters. This is done via the dask-cuda package. However, at this stage, GPU support seems to be limited to Linux-based systems. Alternatively, other libraries including PyTorch offer API to communicate with clusters for distributed workloads. In these cases, we can write our Python script that contains the model we want to run. Next, we need to provide SLURM with an executable bash script were we specify the type of resources we want to allocate for the workload.

For more information on how to work with GPU-based tasks, join our next workshop on March 24th!